Make FtpsiteAgent FileHandling aware

It emits file_pointers for changed files and writes received events to the FTP server.

Dominik Sander 9 年之前
父節點
當前提交
9d1bfeb778

+ 96 - 28
app/models/agents/ftpsite_agent.rb

@@ -3,23 +3,42 @@ require 'time'
3 3
 
4 4
 module Agents
5 5
   class FtpsiteAgent < Agent
6
-    cannot_receive_events!
6
+    include FileHandling
7 7
     default_schedule "every_12h"
8 8
 
9 9
     gem_dependency_check { defined?(Net::FTP) && defined?(Net::FTP::List) }
10 10
 
11
-    description <<-MD
12
-      The FTP Site Agent checks an FTP site and creates Events based on newly uploaded files in a directory.
11
+    emits_file_pointer!
13 12
 
14
-      #{'## Include `net-ftp-list` in your Gemfile to use this Agent!' if dependencies_missing?}
13
+    description do
14
+      <<-MD
15
+        The Ftp Site Agent checks an FTP site and creates Events based on newly uploaded files in a directory. When receiving events it creates files on the configured FTP server.
15 16
 
17
+        #{'## Include `net-ftp-list` in your Gemfile to use this Agent!' if dependencies_missing?}
16 18
 
17
-      Specify a `url` that represents a directory of an FTP site to watch, and a list of `patterns` to match against file names.
19
+        `mode` must be present and either `read` or `write`, in `read` mode the agent checks the FTP site for changed files, with `write` it writes received events to a file on the server.
18 20
 
19
-      Login credentials can be included in `url` if authentication is required.
21
+        ### Universal options
20 22
 
21
-      Only files with a last modification time later than the `after` value, if specifed, are notified.
22
-    MD
23
+        Specify a `url` that represents a directory of an FTP site to watch, and a list of `patterns` to match against file names.
24
+
25
+        Login credentials can be included in `url` if authentication is required: `ftp://username:password@ftp.example.com/path`. Liquid formatting is supported as well: `ftp://{% credential ftp_credentials %}@ftp.example.com/`
26
+
27
+        Optionally specify the encoding of the files you want to read/write in `force_encoding`, by default UTF-8 is used.
28
+
29
+        ### Reading
30
+
31
+        Only files with a last modification time later than the `after` value, if specifed, are emitted as event.
32
+
33
+        ### Writing
34
+
35
+        Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
36
+
37
+        Use [Liquid](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
38
+
39
+        #{emitting_file_handling_agent_description}
40
+      MD
41
+    end
23 42
 
24 43
     event_description <<-MD
25 44
       Events look like this:
@@ -32,42 +51,67 @@ module Agents
32 51
     MD
33 52
 
34 53
     def working?
35
-      event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
54
+      if interpolated['mode'] == 'read'
55
+        event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
56
+      else
57
+        received_event_without_error?
58
+      end
36 59
     end
37 60
 
38 61
     def default_options
39 62
       {
63
+        'mode' => 'read',
40 64
         'expected_update_period_in_days' => "1",
41 65
         'url' => "ftp://example.org/pub/releases/",
42 66
         'patterns' => [
43 67
           'foo-*.tar.gz',
44 68
         ],
45 69
         'after' => Time.now.iso8601,
70
+        'force_encoding' => '',
71
+        'filename' => '',
72
+        'data' => '{{ data }}'
46 73
       }
47 74
     end
48 75
 
49 76
     def validate_options
50 77
       # Check for required fields
51 78
       begin
52
-        url = options['url']
53
-        String === url or raise
54
-        uri = URI(url)
55
-        URI::FTP === uri or raise
56
-        errors.add(:base, "url must end with a slash") unless uri.path.end_with?('/')
79
+        if !options['url'].include?('{{')
80
+          url = interpolated['url']
81
+          String === url or raise
82
+          uri = URI(url)
83
+          URI::FTP === uri or raise
84
+          errors.add(:base, "url must end with a slash") if uri.path.present? && !uri.path.end_with?('/')
85
+        end
57 86
       rescue
58 87
         errors.add(:base, "url must be a valid FTP URL")
59 88
       end
60 89
 
61
-      patterns = options['patterns']
62
-      case patterns
63
-      when Array
64
-        if patterns.empty?
65
-          errors.add(:base, "patterns must not be empty")
90
+      options['mode'] = 'read' if options['mode'].blank? && new_record?
91
+      if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
92
+        errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
93
+      end
94
+
95
+      case interpolated['mode']
96
+      when 'read'
97
+        patterns = options['patterns']
98
+        case patterns
99
+        when Array
100
+          if patterns.empty?
101
+            errors.add(:base, "patterns must not be empty")
102
+          end
103
+        when nil, ''
104
+          errors.add(:base, "patterns must be specified")
105
+        else
106
+          errors.add(:base, "patterns must be an array")
107
+        end
108
+      when 'write'
109
+        if options['filename'].blank?
110
+          errors.add(:base, "filename must be specified in 'write' mode")
111
+        end
112
+        if options['data'].blank?
113
+          errors.add(:base, "data must be specified in 'write' mode")
66 114
         end
67
-      when nil, ''
68
-        errors.add(:base, "patterns must be specified")
69
-      else
70
-        errors.add(:base, "patterns must be an array")
71 115
       end
72 116
 
73 117
       # Check for optional fields
@@ -85,6 +129,7 @@ module Agents
85 129
     end
86 130
 
87 131
     def check
132
+      return if interpolated['mode'] != 'read'
88 133
       saving_entries do |found|
89 134
         each_entry { |filename, mtime|
90 135
           found[filename, mtime]
@@ -92,6 +137,17 @@ module Agents
92 137
       end
93 138
     end
94 139
 
140
+    def receive(incoming_events)
141
+      return if interpolated['mode'] != 'write'
142
+      incoming_events.each do |event|
143
+        mo = interpolated(event)
144
+        mo['data'].encode!(interpolated['force_encoding'], invalid: :replace, undef: :replace) if interpolated['force_encoding'].present?
145
+        open_ftp(base_uri) do |ftp|
146
+          ftp.storbinary("STOR #{mo['filename']}", StringIO.new(mo['data']), Net::FTP::DEFAULT_BLOCKSIZE)
147
+        end
148
+      end
149
+    end
150
+
95 151
     def each_entry
96 152
       patterns = interpolated['patterns']
97 153
 
@@ -147,9 +203,10 @@ module Agents
147 203
 
148 204
       ftp.passive = true
149 205
 
150
-      path = uri.path.chomp('/')
151
-      log "Changing directory to #{path}"
152
-      ftp.chdir(path)
206
+      if (path = uri.path.chomp('/')).present?
207
+        log "Changing directory to #{path}"
208
+        ftp.chdir(path)
209
+      end
153 210
 
154 211
       yield ftp
155 212
     ensure
@@ -176,17 +233,28 @@ module Agents
176 233
       new_files.sort_by { |filename|
177 234
         found_entries[filename]
178 235
       }.each { |filename|
179
-        create_event payload: {
236
+        create_event payload: get_file_pointer(filename).merge({
180 237
           'url' => (base_uri + uri_path_escape(filename)).to_s,
181 238
           'filename' => filename,
182 239
           'timestamp' => found_entries[filename],
183
-        }
240
+        })
184 241
       }
185 242
 
186 243
       memory['known_entries'] = found_entries
187 244
       save!
188 245
     end
189 246
 
247
+    def get_io(file)
248
+      data = StringIO.new
249
+      open_ftp(base_uri) do |ftp|
250
+        ftp.getbinaryfile(file, nil) do |chunk|
251
+          data.write chunk.force_encoding(options['force_encoding'].presence || 'UTF-8')
252
+        end
253
+      end
254
+      data.rewind
255
+      data
256
+    end
257
+
190 258
     private
191 259
 
192 260
     def is_positive_integer?(value)

+ 15 - 0
db/migrate/20160224120316_add_mode_option_to_ftpsite_agents.rb

@@ -0,0 +1,15 @@
1
+class AddModeOptionToFtpsiteAgents < ActiveRecord::Migration
2
+  def up
3
+    Agents::FtpsiteAgent.find_each do |agent|
4
+      agent.options['mode'] = 'read'
5
+      agent.save!(validate: false)
6
+    end
7
+  end
8
+
9
+  def down
10
+    Agents::FtpsiteAgent.find_each do |agent|
11
+      agent.options.delete 'mode'
12
+      agent.save!(validate: false)
13
+    end
14
+  end
15
+end

+ 143 - 0
spec/models/agents/ftpsite_agent_spec.rb

@@ -8,12 +8,74 @@ describe Agents::FtpsiteAgent do
8 8
         'expected_update_period_in_days' => 1,
9 9
         'url' => "ftp://ftp.example.org/pub/releases/",
10 10
         'patterns' => ["example*.tar.gz"],
11
+        'mode' => 'read',
12
+        'filename' => 'test',
13
+        'data' => '{{ data }}'
11 14
       }
12 15
       @checker = Agents::FtpsiteAgent.new(:name => "Example", :options => @site, :keep_events_for => 2.days)
13 16
       @checker.user = users(:bob)
14 17
       @checker.save!
15 18
     end
16 19
 
20
+    context "#validate_options" do
21
+      it "requires url to be a valid URI" do
22
+        @checker.options['url'] = 'not_valid'
23
+        expect(@checker).not_to be_valid
24
+      end
25
+
26
+      it "allows an URI without a path" do
27
+        @checker.options['url'] = 'ftp://ftp.example.org'
28
+        expect(@checker).to be_valid
29
+      end
30
+
31
+      it "does not check the url when liquid output markup is used" do
32
+        @checker.options['url'] = 'ftp://{{ ftp_host }}'
33
+        expect(@checker).to be_valid
34
+      end
35
+
36
+      it "requires patterns to be present and not empty array" do
37
+        @checker.options['patterns'] = ''
38
+        expect(@checker).not_to be_valid
39
+        @checker.options['patterns'] = 'not an array'
40
+        expect(@checker).not_to be_valid
41
+        @checker.options['patterns'] = []
42
+        expect(@checker).not_to be_valid
43
+      end
44
+
45
+      it "when present timestamp must be parsable into a Time object instance" do
46
+        @checker.options['timestamp'] = '2015-01-01 00:00:01'
47
+        expect(@checker).to be_valid
48
+        @checker.options['timestamp'] = 'error'
49
+        expect(@checker).not_to be_valid
50
+      end
51
+
52
+      it "requires mode to be set to 'read' or 'write'" do
53
+        @checker.options['mode'] = 'write'
54
+        expect(@checker).to be_valid
55
+        @checker.options['mode'] = ''
56
+        expect(@checker).not_to be_valid
57
+      end
58
+
59
+      it 'automatically sets mode to read when the agent is a new record' do
60
+        checker = Agents::FtpsiteAgent.new(name: 'test', options: @site.except('mode'))
61
+        checker.user = users(:bob)
62
+        expect(checker).to be_valid
63
+        expect(checker.options['mode']).to eq('read')
64
+      end
65
+
66
+      it "requires 'filename' in 'write' mode" do
67
+        @checker.options['mode'] = 'write'
68
+        @checker.options['filename'] = ''
69
+        expect(@checker).not_to be_valid
70
+      end
71
+
72
+      it "requires 'data' in 'write' mode" do
73
+        @checker.options['mode'] = 'write'
74
+        @checker.options['data'] = ''
75
+        expect(@checker).not_to be_valid
76
+      end
77
+    end
78
+
17 79
     describe "#check" do
18 80
 
19 81
       before do
@@ -42,6 +104,7 @@ describe Agents::FtpsiteAgent do
42 104
         }
43 105
 
44 106
         expect(Event.last(2).first.payload).to eq({
107
+          'file_pointer' => { 'file' => 'example-1.1.tar.gz', 'agent_id' => @checker.id },
45 108
           'url' => 'ftp://ftp.example.org/pub/releases/example-1.1.tar.gz',
46 109
           'filename' => 'example-1.1.tar.gz',
47 110
           'timestamp' => '2014-04-01T10:00:00Z',
@@ -71,12 +134,14 @@ describe Agents::FtpsiteAgent do
71 134
         }
72 135
 
73 136
         expect(Event.last(2).first.payload).to eq({
137
+          'file_pointer' => { 'file' => 'example-1.2.tar.gz', 'agent_id' => @checker.id },
74 138
           'url' => 'ftp://ftp.example.org/pub/releases/example-1.2.tar.gz',
75 139
           'filename' => 'example-1.2.tar.gz',
76 140
           'timestamp' => '2014-04-02T10:00:00Z',
77 141
         })
78 142
 
79 143
         expect(Event.last.payload).to eq({
144
+          'file_pointer' => { 'file' => 'example latest.tar.gz', 'agent_id' => @checker.id },
80 145
           'url' => 'ftp://ftp.example.org/pub/releases/example%20latest.tar.gz',
81 146
           'filename' => 'example latest.tar.gz',
82 147
           'timestamp' => '2014-04-02T10:00:01Z',
@@ -113,5 +178,83 @@ describe Agents::FtpsiteAgent do
113 178
       end
114 179
     end
115 180
 
181
+    context "#open_ftp" do
182
+      before(:each) do
183
+        @ftp_mock = mock()
184
+        mock(@ftp_mock).close
185
+        mock(@ftp_mock).connect('ftp.example.org', 21)
186
+        mock(@ftp_mock).passive=(true)
187
+        mock(Net::FTP).new { @ftp_mock }
188
+      end
189
+      context 'with_path' do
190
+        before(:each) { mock(@ftp_mock).chdir('pub/releases') }
191
+
192
+        it "logs in as anonymous when no user and password are given" do
193
+          mock(@ftp_mock).login('anonymous', 'anonymous@')
194
+          expect { |b| @checker.open_ftp(@checker.base_uri, &b) }.to yield_with_args(@ftp_mock)
195
+        end
196
+
197
+        it "passes the provided user and password" do
198
+          @checker.options['url'] = "ftp://user:password@ftp.example.org/pub/releases/"
199
+          mock(@ftp_mock).login('user', 'password')
200
+          expect { |b| @checker.open_ftp(@checker.base_uri, &b) }.to yield_with_args(@ftp_mock)
201
+        end
202
+      end
203
+
204
+      it "does not call chdir when no path is given" do
205
+        @checker.options['url'] = "ftp://ftp.example.org/"
206
+        mock(@ftp_mock).login('anonymous', 'anonymous@')
207
+        expect { |b| @checker.open_ftp(@checker.base_uri, &b) }.to yield_with_args(@ftp_mock)
208
+      end
209
+    end
210
+
211
+    context "#get_io" do
212
+      it "returns the contents of the file" do
213
+        ftp_mock= mock()
214
+        mock(ftp_mock).getbinaryfile('file', nil).yields('data')
215
+        mock(@checker).open_ftp(@checker.base_uri).yields(ftp_mock)
216
+        expect(@checker.get_io('file').read).to eq('data')
217
+      end
218
+
219
+      it "uses the encoding specified in force_encoding to convert the data to UTF-8" do
220
+        ftp_mock= mock()
221
+        mock(ftp_mock).getbinaryfile('file', nil).yields('ümlaut'.force_encoding('ISO-8859-15'))
222
+        mock(@checker).open_ftp(@checker.base_uri).yields(ftp_mock)
223
+        expect(@checker.get_io('file').read).to eq('ümlaut')
224
+      end
225
+
226
+      it "returns an empty StringIO instance when no data was read" do
227
+        ftp_mock= mock()
228
+        mock(ftp_mock).getbinaryfile('file', nil)
229
+        mock(@checker).open_ftp(@checker.base_uri).yields(ftp_mock)
230
+        expect(@checker.get_io('file').length).to eq(0)
231
+      end
232
+    end
233
+
234
+    context "#receive" do
235
+      before(:each) do
236
+        @checker.options['mode'] = 'write'
237
+        @checker.options['filename'] = 'file.txt'
238
+        @checker.options['data'] = '{{ data }}'
239
+        @ftp_mock= mock()
240
+        @stringio = StringIO.new()
241
+        mock(@checker).open_ftp(@checker.base_uri).yields(@ftp_mock)
242
+      end
243
+
244
+      it "writes the data at data into a file" do
245
+        mock(StringIO).new('hello world🔥') { @stringio }
246
+        mock(@ftp_mock).storbinary('STOR file.txt', @stringio, Net::FTP::DEFAULT_BLOCKSIZE)
247
+        event = Event.new(payload: {'data' => 'hello world🔥'})
248
+        @checker.receive([event])
249
+      end
250
+
251
+      it "converts the string encoding when force_encoding is specified" do
252
+        @checker.options['force_encoding'] = 'ISO-8859-1'
253
+        mock(StringIO).new('hello world?') { @stringio }
254
+        mock(@ftp_mock).storbinary('STOR file.txt', @stringio, Net::FTP::DEFAULT_BLOCKSIZE)
255
+        event = Event.new(payload: {'data' => 'hello world🔥'})
256
+        @checker.receive([event])
257
+      end
258
+    end
116 259
   end
117 260
 end